#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "rpc_proto.h"
#include "core.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "corerpc.h"
#include "stor_rpc.h"
#include "../controller/stor_ctl.h"
#include "../controller/volume_ctl.h"
#include "ylog.h"
#include "dbg.h"

typedef enum {
        STOR_RPC_NULL = 500,
        STOR_RPC_MKVOL,
        STOR_RPC_MKVOLWITH,
        STOR_RPC_MKPOOL,
        STOR_RPC_LISTPOOL_OPEN,
        STOR_RPC_LISTPOOL,
        STOR_RPC_LISTPOOL_CLOSE,
        STOR_RPC_EXTENDPOOL,
        STOR_RPC_LOOKUP,
        STOR_RPC_GETPOOL,
        STOR_RPC_GETATTR,
        STOR_RPC_SETATTR,
        STOR_RPC_RMPOOL,
        STOR_RPC_RMVOL,
        STOR_RPC_CLEANUP,
        STOR_RPC_CLEANUP_BH,
        STOR_RPC_ROLLBACK_BH,
        STOR_RPC_FLAT_BH,
        STOR_RPC_RMSNAP_BH,
        STOR_RPC_DISCARD,

        STOR_RPC_MOVE,
        STOR_RPC_LOCALIZE,
        STOR_RPC_STAT,
        STOR_RPC_CHECK_READY,

        STOR_RPC_CONNECTION,
        STOR_RPC_CONNECT,
        STOR_RPC_DISCONNECT,

        STOR_RPC_READ,
        STOR_RPC_WRITE,
        STOR_RPC_UNMAP,
        STOR_RPC_TABLE_READ,
        STOR_RPC_TABLE_WRITE,

        STOR_RPC_RENAME_LOCK,
        STOR_RPC_RENAME_UNLOCK,
        STOR_RPC_RENAME,

        STOR_RPC_XATTR_SET,
        STOR_RPC_XATTR_GET,
        STOR_RPC_XATTR_LIST,
        STOR_RPC_XATTR_REMOVE,

        STOR_RPC_NEWCHUNK,
        STOR_RPC_CHUNK_MIGRATE,
        STOR_RPC_CHUNK_GETINFO,
        STOR_RPC_CHUNK_ALLOCATE,
        STOR_RPC_CHUNK_MOVE,
        STOR_RPC_CHUNK_CHECK,
        STOR_RPC_CHUNK_CHECK_MULTI,
        STOR_RPC_CHUNK_UPDATE,
        STOR_RPC_CHUNK_SET,
        STOR_RPC_CHUNK_CLEANUP,
        STOR_RPC_CHUNK_REJECT,

        STOR_RPC_CHUNK_EXIST,

        STOR_RPC_SNAPSHOT_CREATE,
        STOR_RPC_SNAPSHOT_ROLLBACK,
        STOR_RPC_SNAPSHOT_REMOVE,
        STOR_RPC_SNAPSHOT_PROTECT,
        STOR_RPC_SNAPSHOT_UPDATEPARENT,
        STOR_RPC_SNAPSHOT_SETFROM,
        STOR_RPC_SNAPSHOT_CLEANUP,
        STOR_RPC_SNAPSHOT_LISTOPEN,
        STOR_RPC_SNAPSHOT_LISTCLOSE,
        STOR_RPC_SNAPSHOT_LIST,
        STOR_RPC_SNAPSHOT_READ,
        STOR_RPC_SNAPSHOT_DIFF,
        STOR_RPC_SNAPSHOT_FLAT,
        STOR_RPC_SNAPSHOT_CHECK,
        STOR_RPC_SNAPSHOT_ISEMPTY,

        STOR_RPC_GROUP_SNAPSHOT_LOCK,
        STOR_RPC_GROUP_SNAPSHOT_UNLOCK,
        STOR_RPC_GROUP_SNAPSHOT_CREATE_NOLOCK,
        STOR_RPC_GROUP_SNAPSHOT_REMOVE_NOLOCK,

        STOR_RPC_SNAPSHOT_READ_META,
        STOR_RPC_LOWER_READ,
        STOR_RPC_INTACT_CHECK,

        STOR_RPC_VFM_SET,
        STOR_RPC_VFM_GET,
        STOR_RPC_VFM_STAT,

        STOR_RPC_SNAPSHOT_LAST,
        STOR_RPC_SNAPSHOT_PREV,

        STOR_RPC_MAX,
} node_rpc_op_t;

typedef struct {
        uint32_t op;
        chkid_t fileid;
        uint32_t arg_len;
        int force;
        char buf[0];
} msg_t;

static __request_handler_func__  __request_handler__[STOR_RPC_MAX - STOR_RPC_NULL];
static char  __request_name__[STOR_RPC_MAX - STOR_RPC_NULL][__RPC_HANDLER_NAME__ ];

static void __request_set_handler(int op, __request_handler_func__ func, const char *name)
{
        YASSERT(strlen(name) + 1 < __RPC_HANDLER_NAME__ );
        strcpy(__request_name__[op - STOR_RPC_NULL], name);
        __request_handler__[op - STOR_RPC_NULL] = func;
}

static void __request_get_handler(int op, __request_handler_func__ *func, const char **name)
{
        *func = __request_handler__[op - STOR_RPC_NULL];
        *name = __request_name__[op - STOR_RPC_NULL];
}

static void __getmsg(buffer_t *buf, msg_t **_req, int *buflen, char *_buf)
{
        msg_t *req;

        YASSERT(buf->len <= MEM_CACHE_SIZE4K);

        req = (void *)_buf;
        *buflen = buf->len - sizeof(*req);
        mbuffer_get(buf, req, buf->len);

        *_req = req;
}

STATIC int __stor_ctl_chunk_move(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        const nid_t *dist;
        const int *dist_count;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &dist,
                       NULL, &dist_count, NULL, NULL);

        ret = stor_ctl_chunk_move(&req->fileid, chkid, dist, *dist_count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_check(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, oflags = 0;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        const int *idx, *async, *force;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &idx,
                       NULL, &async, NULL, &force, NULL);

        ret = stor_ctl_chunk_check(&req->fileid, chkid, *idx, *async, *force, &oflags);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &oflags, sizeof(int));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_check_multi(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, *retval;
        msg_t *req;
        nid_t *srcnid;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        const int *chknum;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &srcnid, NULL, &chkid, NULL, &chknum, NULL,
                        NULL, NULL);

        ret = ymalloc((void **)&retval, sizeof(int) * (*chknum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_ctl_chunk_check_multi(&req->fileid, chkid, *chknum, srcnid, retval);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

#if 0
        for (int i =0; i < *chknum; i++) {
                DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(&chkid[i]), retval[i]);
        }
#endif

        rpc_reply(sockid, msgid, retval, sizeof(int) * (*chknum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        yfree((void **)&retval);

        return 0;
err_free:
        yfree((void **)&retval);
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_set(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        const int *status;
        const nid_t *nid;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &nid,
                       NULL, &status, NULL, NULL);

        ret = stor_ctl_chunk_set(&req->fileid, chkid, nid, *status);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_xattr_set(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t valuelen;
        const char *key;
        const void *value;
        int *flag;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &key, NULL,
                       &value, &valuelen, &flag, NULL, NULL);

        ret = stor_ctl_xattr_set(&req->fileid, key, value, valuelen, *flag);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_xattr_get(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *value = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *key;
        int valuelen;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &key, NULL, NULL);

        ret = stor_ctl_xattr_get(&req->fileid, key, value, &valuelen);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, value, valuelen);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        mem_cache_free(MEM_CACHE_4K, value);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        mem_cache_free(MEM_CACHE_4K, value);
        return ret;
}

static int __stor_ctl_xattr_list(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        int valuelen = MAX_BUF_LEN;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *value = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        ret = stor_ctl_xattr_list(&req->fileid, value, &valuelen);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, value, valuelen);

        mem_cache_free(MEM_CACHE_4K, buf);
        mem_cache_free(MEM_CACHE_4K, value);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        mem_cache_free(MEM_CACHE_4K, value);
        return ret;
}

STATIC int __stor_ctl_xattr_remove(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *key;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &key, NULL, NULL);

        ret = stor_ctl_xattr_remove(&req->fileid, key);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_table_read(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        uint32_t *size;
        buffer_t reply;
        uint64_t *offset;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &size, NULL, &offset, NULL, NULL);

        mbuffer_init(&reply, 0);
        ret = stor_ctl_table_read(&req->fileid, chkid, &reply, *size, *offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply1(sockid, msgid, &reply);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_table_write(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        uint32_t *size, buflen;
        uint64_t *offset;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->arg_len;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &size, NULL, &offset, NULL, NULL);

        ret = stor_ctl_table_write(&req->fileid, chkid, _buf, *size, *offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_flat(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        int *idx, *force;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &idx, NULL, &force, NULL, NULL);

        ret = volume_ctl_snapshot_flat(&req->fileid, *idx, *force);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_mkvol(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        const char *site_name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char tmp[CHKSTAT_MAX];
        chkinfo_t *chkinfo;
        setattr_t *setattr;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &site_name, NULL, &setattr, NULL, NULL);

        chkinfo = (void *)tmp;
        ret = stor_ctl_mkvol(&req->fileid, name, site_name, setattr, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, chkinfo,
                  CHKINFO_SIZE(chkinfo->repnum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_mkpool(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        const char *site_name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char tmp[CHKSTAT_MAX];
        chkinfo_t *chkinfo;
        setattr_t *setattr;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &site_name, NULL, &setattr, NULL, NULL);

        chkinfo = (void *)tmp;
        ret = stor_ctl_mkpool(&req->fileid, name, site_name, setattr, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, chkinfo,
                  CHKINFO_SIZE(chkinfo->repnum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_listpool_open(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *uuid;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &uuid, NULL, NULL);

        ret = stor_ctl_listpool_open(&req->fileid, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_listpool(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, delen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *de;
        const char *uuid;
        uint64_t *offset;

        ANALYSIS_BEGIN(0);

        ret = ymalloc((void **)&de, BIG_BUF_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &offset, NULL, &uuid, NULL, NULL);

        delen = BIG_BUF_LEN;
        ret = stor_ctl_listpool(&req->fileid, uuid, *offset, de, &delen);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_free, ret);
        }

        rpc_reply(sockid, msgid, de, delen);

        ANALYSIS_END(0, IO_WARN, NULL);

        yfree((void **)&de);
        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_free:
        yfree((void **)&de);
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_listpool_close(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *uuid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &uuid, NULL, NULL);

        ret = stor_ctl_listpool_close(&req->fileid, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_extend_pool(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, NULL);

        ret = stor_ctl_extend_pool(&req->fileid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}
STATIC int __stor_ctl_lookup(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char tmp[CHKSTAT_MAX];
        chkinfo_t *chkinfo;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        chkinfo = (void *)tmp;
        ret = stor_ctl_lookup(&req->fileid, name, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, chkinfo,
                  CHKINFO_SIZE(chkinfo->repnum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_getinfo(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const chkid_t *chkid;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char tmp[CHKSTAT_MAX];
        chkinfo_t *chkinfo;

        ANALYSIS_BEGIN(0);
        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &chkid, NULL, NULL);

        chkinfo = (void *)tmp;
        ret = stor_ctl_chunk_getinfo(&req->fileid, chkid, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        DBUG("chkid %s %s from %s\n", id2str(chkid), id2str(&chkinfo->id), _inet_ntoa(sockid->addr));
        //CHKINFO_DUMP(chkinfo, D_INFO);
        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        rpc_reply(sockid, msgid, chkinfo,
                  CHKINFO_SIZE(chkinfo->repnum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_getpool(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char pool[MAX_NAME_LEN];

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        mbuffer_get(_buf, req, _buf->len);

        DBUG("chunk %s %p\n", id2str(&req->fileid), &req->fileid);
        ret = stor_ctl_getpool(&req->fileid, pool);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, pool, strlen(pool) + 1);

        ANALYSIS_END(0, IO_WARN, id2str(&req->fileid));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_getattr(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        fileinfo_t fileinfo;
        setattr_t *setattr;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &setattr, NULL, NULL);

        DBUG("chunk %s %p\n", id2str(&req->fileid), &req->fileid);
        ret = stor_ctl_getattr(&req->fileid, &fileinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &fileinfo, sizeof(fileinfo));

        ANALYSIS_END(0, IO_WARN, id2str(&req->fileid));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_setattr(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        fileinfo_t fileinfo;
        setattr_t *setattr;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &setattr, NULL, NULL);

        ret = stor_ctl_setattr(&req->fileid, &fileinfo, setattr);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &fileinfo, sizeof(fileinfo));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_allocate(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf;
        const chkid_t *chkid;
        int *fill, *chknum;

        ANALYSIS_BEGIN(0);

        ret = ymalloc((void **)&buf, _buf->len);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &chknum, NULL, &fill, NULL, NULL);

        YASSERT(*chknum);
        YASSERT(chkid);
        ret = stor_ctl_chunk_allocate(&req->fileid, chkid, *chknum, *fill);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        yfree((void **)&buf);

        return 0;
err_ret:
        yfree((void **)&buf);
        return ret;
}

STATIC int __stor_ctl_chunk_migrate(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char tmp[CHKSTAT_MAX];
        chkinfo_t *chkinfo;
        const chkid_t *chkid;
        uint32_t *force;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &chkid, NULL,
                       &force, NULL,
                       NULL);

        chkinfo = (void *)tmp;
        ret = stor_ctl_chunk_migrate(&req->fileid, chkid, *force, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, chkinfo,
                  CHKINFO_SIZE(chkinfo->repnum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_update(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf;
        const chkinfo_t *chkinfo;
        const chkid_t *chkid;
        const nid_t *owner;
        const uint64_t *info_version;

        ANALYSIS_BEGIN(0);

        buf = mem_cache_calloc(MEM_CACHE_4K, 1);
        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &chkid, NULL,
                       &chkinfo, NULL,
                       &owner, NULL,
                       &info_version, NULL,
                       NULL);

        ret = stor_ctl_chunk_update(chkid, chkinfo, owner, *info_version);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_cleanup(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf;
        const chkid_t *chkid;
        const nid_t *nid;
        uint64_t *meta_version;

        ANALYSIS_BEGIN(0);

        buf = mem_cache_calloc(MEM_CACHE_4K, 1);
        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &chkid, NULL,
                       &nid, NULL,
                       &meta_version, NULL,
                       NULL);

        ret = stor_ctl_chunk_cleanup(&req->fileid, chkid, nid, *meta_version);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_cleanup(const nid_t *nid, const fileid_t *fileid,
                         const chkid_t *chkid, const nid_t *_nid, uint64_t meta_version)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_CLEANUP;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count,
                       chkid, sizeof(*chkid),
                       _nid, sizeof(*_nid),
                       &meta_version, sizeof(meta_version),
                       NULL);

        ret = rpc_request_wait("stor_rpc_chunk_cleanup", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rmpool(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *name;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_rmpool(&req->fileid, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rmvol(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *name;

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_rmvol(&req->fileid, name, req->force);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rename_lock(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, *force;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *name;
        const fileid_t *src;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &src, NULL, &name, NULL, &force, NULL, NULL);

        ret = stor_ctl_rename_lock(&req->fileid, src, name, *force);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rename_lock(const nid_t *nid, const fileid_t *fileid,
                       const fileid_t *src, const char *name, int force)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_RENAME_LOCK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, src, sizeof(*src),
                       name, strlen(name) + 1,
                       &force, sizeof(force),
                       NULL);

        ret = rpc_request_wait("stor_rpc_rename_lock", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rename_unlock(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        chkinfo_t *chkinfo;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &chkinfo, NULL, NULL);

        ret = stor_ctl_rename_unlock(&req->fileid, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rename(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const fileid_t *from, *to;
        char *fname, *tname;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &from, NULL,
                       &fname, NULL,
                       &to, NULL,
                       &tname, NULL,
                       NULL);

        ret = stor_ctl_rename(&req->fileid, fname, to, tname);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_move(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *nid;
        int count;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &nid, &count,
                       NULL);

        DINFO("begin move "CHKID_FORMAT"\n", CHKID_ARG(&req->fileid));

        ret = stor_ctl_move(&req->fileid, nid, count / sizeof(*nid));
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        DINFO("end move "CHKID_FORMAT"\n", CHKID_ARG(&req->fileid));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_mkvol(const nid_t *nid, const chkid_t *parent, const char *name, const char *site_name,
                  const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_MKVOL;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, name, strlen(name) + 1,
                        site_name, strlen(site_name)+1, setattr, sizeof(*setattr), NULL);

        ret = rpc_request_wait("stor_rpc_mkvol", nid,
                               req, sizeof(*req) + count,
                               chkinfo, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_mkpool(const nid_t *nid, const chkid_t *parent, const char *name,
                 const char*site_name, const setattr_t *setattr, chkinfo_t *chkinfo)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_MKPOOL;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count,
                        name, strlen(name) + 1,
                        site_name, strlen(site_name) + 1,
                        setattr, sizeof(*setattr), NULL);

        ret = rpc_request_wait("stor_rpc_mkpool", nid,
                               req, sizeof(*req) + count,
                               chkinfo, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_listpool_open(const nid_t *nid, const chkid_t *parent, const char *uuid)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_LISTPOOL_OPEN;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, uuid, strlen(uuid) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_listpool_open", nid,
                        req, sizeof(*req) + count,
                        NULL, NULL,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_listpool(const nid_t *nid, const chkid_t *parent, const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_LISTPOOL;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, &offset, sizeof(offset), uuid, strlen(uuid) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_listpool", nid,
                        req, sizeof(*req) + count,
                        de, delen,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_listpool_close(const nid_t *nid, const chkid_t *parent, const char *uuid)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_LISTPOOL_CLOSE;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, uuid, strlen(uuid) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_listpool_close", nid,
                        req, sizeof(*req) + count,
                        NULL, NULL,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_extend_pool(const nid_t *nid, const chkid_t *poolid)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_EXTENDPOOL;
        req->fileid = *poolid;
        _opaque_encode(req->buf, &count, NULL);

        ret = rpc_request_wait("stor_rpc_extend_pool", nid,
                        req, sizeof(*req) + count,
                        NULL, NULL,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;

}

int stor_rpc_lookup(const nid_t *nid, const chkid_t *parent, const char *name,
                  chkinfo_t *chkinfo)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_LOOKUP;
        req->fileid = *parent;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_lookup", nid,
                               req, sizeof(*req) + count,
                               chkinfo, NULL,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_getinfo(const nid_t *nid, const chkid_t *parent, const chkid_t *chkid,
                           chkinfo_t *chkinfo)
{
        int ret, replen = MAX_MSG_SIZE;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        YASSERT(chkid->id);

        DBUG("lookup "CHKID_FORMAT" @ "CHKID_FORMAT" from %s\n",
              CHKID_ARG(chkid), CHKID_ARG(parent), network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_GETINFO;
        req->fileid = *parent;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), NULL);

        ret = rpc_request_wait("stor_rpc_chunk_getinfo", nid,
                               req, sizeof(*req) + count,
                               chkinfo, &replen,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(replen == (int)CHKINFO_SIZE(chkinfo->repnum));
        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_getpool(const nid_t *nid, const chkid_t *chkid, char *pool)
{
        int ret, replen = MAX_MSG_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_GETPOOL;
        req->fileid = *chkid;
        YASSERT(chkid->id);
        count = 0;

        ret = rpc_request_wait("stor_rpc_getpool", nid,
                               req, sizeof(*req) + count,
                               pool, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //YASSERT(replen == (int)CHKINFO_SIZE(chkinfo->repnum));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_getattr(const nid_t *nid, const chkid_t *chkid,  fileinfo_t *fileinfo)
{
        int ret, replen = MAX_MSG_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_GETATTR;
        req->fileid = *chkid;
        YASSERT(chkid->id);
        count = 0;

        ret = rpc_request_wait("stor_rpc_getattr", nid,
                               req, sizeof(*req) + count,
                               fileinfo, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //YASSERT(replen == (int)CHKINFO_SIZE(chkinfo->repnum));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_setattr(const nid_t *nid, const chkid_t *chkid,  fileinfo_t *fileinfo, const setattr_t *setattr)
{
        int ret, replen = MAX_MSG_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SETATTR;
        req->fileid = *chkid;
        //req->buflen = 0;

        _opaque_encode(req->buf, &count, setattr, sizeof(*setattr), NULL);

        ret = rpc_request_wait("stor_rpc_setattr", nid,
                               req, sizeof(*req) + count,
                               fileinfo, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_table_write(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid,
                const buffer_t *_buf, uint32_t size, uint64_t offset)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        YASSERT(_buf->len <= LICH_CHUNK_SPLIT);
        YASSERT(fileid->type == __VOLUME_CHUNK__);

        DBUG("write chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
              CHKID_ARG(fileid), (LLU)offset, size, network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_TABLE_WRITE;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), &size, sizeof(size), &offset, sizeof(offset), NULL);
        req->arg_len = count;

        ret = rpc_request_wait1("stor_rpc_table_write", nid,
                                req, sizeof(*req) + count, _buf,
                                MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_table_read(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid,
                buffer_t *_buf, uint32_t size, uint64_t offset)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        DBUG("read chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
             CHKID_ARG(fileid), (LLU)offset, size, network_rname(nid));

        req = (void *)buf;
        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), &size, sizeof(size), &offset, sizeof(offset), NULL);
        req->op = STOR_RPC_TABLE_READ;
        req->fileid = *fileid;
        req->arg_len = count;

        ret = rpc_request_wait2("stor_rpc_table_read", nid,
                                req, sizeof(*req) + count, _buf,
                                MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_allocate(const nid_t *nid, const fileid_t *fileid,
                const chkid_t *chkid, int chknum, int fill)
{
        int ret, len;
        char *buf;
        msg_t *req;
        uint32_t count;

        len = (sizeof(*chkid) * chknum + MEM_CACHE_SIZE4K);

        ret = ymalloc((void **)&buf, len);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_ALLOCATE;
        req->fileid = *fileid;

        YASSERT(chknum);
        _opaque_encode1(req->buf, len, &count, chkid, sizeof(*chkid) * chknum,
                        &chknum, sizeof(chknum), &fill, sizeof(fill), NULL);

        ret = rpc_request_wait("stor_rpc_chunk_allocate", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_free, ret);

        yfree((void **)&buf);

        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

#if 0
int stor_rpc_chunk_move(const nid_t *nid, const fileid_t *parent, const chkid_t *chkid,
                      uint32_t op, const reploc_t *dist, int _count, uint32_t force,
                      const nid_t *oper, chkinfo_t *chkinfo)
{
        int ret, replen = MAX_MSG_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_MOVE;
        req->fileid = *parent;

        _opaque_encode(req->buf, &count,
                       chkid, sizeof(*chkid),
                       &op, sizeof(op),
                       dist, sizeof(*dist) * _count,
                       &force, sizeof(force),
                       oper, sizeof(*oper),
                       NULL);

        ret = rpc_request_wait("stor_rpc_chunk_move", nid,
                               req, sizeof(*req) + count,
                               chkinfo, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(replen == (int)CHKINFO_SIZE(chkinfo->repnum));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}
#endif

int stor_rpc_chunk_migrate(const nid_t *nid, const fileid_t *parent, const chkid_t *chkid, uint32_t force,
                        chkinfo_t *chkinfo)
{
        int ret, replen = MAX_MSG_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_MIGRATE;
        req->fileid = *parent;

        _opaque_encode(req->buf, &count,
                       chkid, sizeof(*chkid),
                       &force, sizeof(force),
                       NULL);

        ret = rpc_request_wait("stor_rpc_chunk_migrate", nid,
                               req, sizeof(*req) + count,
                               chkinfo, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(replen == (int)CHKINFO_SIZE(chkinfo->repnum));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}


int stor_rpc_chunk_update(const nid_t *nid, const fileid_t *parent,
                          const chkinfo_t *chkinfo, const nid_t *owner, uint64_t info_version)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_UPDATE;
        req->fileid = *parent;

        _opaque_encode(req->buf, &count,
                       parent, sizeof(*parent),
                       chkinfo, CHKINFO_SIZE(chkinfo->repnum),
                       owner, sizeof(*owner),
                       &info_version, sizeof(info_version),
                       NULL);

        ret = rpc_request_wait("stor_rpc_chunk_update", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rmpool(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_RMPOOL;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_rmpool", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rmvol(const nid_t *nid, const fileid_t *fileid,
                const char *name, int force)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_RMVOL;
        req->fileid = *fileid;
        req->force = force;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_rmvol", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_cleanup(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *name;

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_cleanup(&req->fileid, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_cleanup(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CLEANUP;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_cleanup", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_cleanup_bh(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *name;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_cleanup_bh(&req->fileid, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_cleanup_bh(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CLEANUP_BH;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_cleanup_bh", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rollback_bh(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        mbuffer_get(_buf, req, _buf->len);

        ret = stor_ctl_rollback_bh(&req->fileid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rollback_bh(const nid_t *nid, const fileid_t *fileid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_ROLLBACK_BH;
        req->fileid = *fileid;

        count = 0;

        ret = rpc_request_wait("stor_rpc_rollback_bh", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_flat_bh(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        mbuffer_get(_buf, req, _buf->len);

        ret = stor_ctl_flat_bh(&req->fileid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_flat_bh(const nid_t *nid, const fileid_t *fileid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_FLAT_BH;
        req->fileid = *fileid;

        count = 0;

        ret = rpc_request_wait("stor_rpc_flat_bh", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_rmsnap_bh(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const char *name;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_rmsnap_bh(&req->fileid, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rmsnap_bh(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        req->op = STOR_RPC_RMSNAP_BH;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_rmsnap_bh", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rename_unlock(const nid_t *nid, const fileid_t *fileid, const chkinfo_t *chkinfo)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_RENAME_UNLOCK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkinfo, CHKINFO_SIZE(chkinfo->repnum), NULL);

        ret = rpc_request_wait("stor_rpc_rename_unlock", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_rename(const nid_t *nid, const fileid_t *from, const char *fname,
                  const fileid_t *to, const char *tname)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_RENAME;
        req->fileid = *from;

        _opaque_encode(req->buf, &count,
                       from, sizeof(*from),
                       fname, strlen(fname) + 1,
                       to, sizeof(*to),
                       tname, strlen(tname) + 1,
                       NULL);

        ret = rpc_request_wait("stor_rpc_rename", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_move(const nid_t *nid, const fileid_t *fileid,
                       const chkid_t *chkid, const nid_t *dist, int dist_count)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_MOVE;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), dist,
                       sizeof(*dist)*dist_count, &dist_count, sizeof(dist_count), NULL);

        ret = rpc_request_wait("stor_rpc_chunk_move", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_check(const nid_t *nid, const fileid_t *fileid,
                         const chkid_t *chkid, int idx, int async, int force, int *oflags)
{
        int ret, _oflags = 0, replen = sizeof(int);
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        if (oflags == NULL)
                oflags = &_oflags;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_CHECK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), &idx,
                       sizeof(idx), &async, sizeof(async), &force, sizeof(force), NULL);

        ret = rpc_request_wait("stor_rpc_chunk_check", nid,
                               req, sizeof(*req) + count,
                               oflags, &replen,
                               MSG_FS, SCHEDULE_PRIORITY1, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_check_multi(const nid_t *nid, const fileid_t *fileid,
                         const chkid_t *chkid, int chknum, int *retval)
{
        int ret, timeout;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;
        nid_t srcnid = *net_getnid();

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_CHECK_MULTI;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, &srcnid, sizeof(nid_t),
                        chkid, sizeof(chkid_t) * chknum,
                        &chknum, sizeof(chknum),
                        NULL);

        timeout = _get_long_rpc_timeout();

        ret = rpc_request_wait("stor_rpc_chunk_check", nid,
                               req, sizeof(*req) + count,
                               retval, NULL,
                               MSG_FS, 0, timeout);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        for (int i =0; i < chknum; i++) {
            DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(&chkid[i]), retval[i]);
        }
#endif

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_set(const nid_t *nid, const fileid_t *fileid,
                     const chkid_t *chkid, const nid_t *diskid, int status)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_SET;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), diskid,
                       sizeof(*diskid), &status, sizeof(status), NULL);

        ret = rpc_request_wait("stor_rpc_chunk_set", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_read(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        buffer_t reply;
        const io_t *io;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &io, NULL, NULL);

        mbuffer_init(&reply, 0);
        ret = volume_ctl_read(io, &reply, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        corerpc_reply1(sockid, msgid, &reply);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_read(const nid_t *nid, const io_t *io, buffer_t *_buf)//, uint32_t size, uint64_t offset)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        DBUG("read chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, network_rname(nid));

        req = (void *)buf;
        _opaque_encode(req->buf, &count, io, sizeof(*io), NULL);
        req->op = STOR_RPC_READ;
        req->fileid = io->id;
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_read", nid,
                               req, sizeof(*req) + count, NULL,
                               _buf, MSG_FS, io->size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                //UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_write(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t buflen;
        const io_t *io;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->arg_len;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen, &io, NULL, NULL);

        ret = volume_ctl_write(io, _buf, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        corerpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_write(const nid_t *nid, const io_t *io, const buffer_t *_buf)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        YASSERT(io->id.type == __VOLUME_CHUNK__);

        DBUG("write chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
              CHKID_ARG(&io->id), (LLU)io->offset, io->size, network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_WRITE;
        req->fileid = io->id;
        _opaque_encode(req->buf, &count, io, sizeof(*io), NULL);
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_write", nid,
                               req, sizeof(*req) + count, _buf,
                               NULL, MSG_FS, io->size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_unmap(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t buflen;
        const io_t *io;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->arg_len;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen, &io, NULL, NULL);

        ret = volume_ctl_unmap(io, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        corerpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_unmap(const nid_t *nid, const io_t *io)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        YASSERT(io->id.type == __VOLUME_CHUNK__);

        DBUG("write chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
              CHKID_ARG(&io->id), (LLU)io->offset, io->size, network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_UNMAP;
        req->fileid = io->id;
        _opaque_encode(req->buf, &count, io, sizeof(*io), NULL);
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_unmap", nid,
                               req, sizeof(*req) + count, NULL,
                               NULL, MSG_FS, io->size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_xattr_set(const nid_t *nid, const fileid_t *fileid, const char *key,
                     const char *value, uint32_t valuelen, int flag)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        (void) valuelen;

        YASSERT(strlen(value) + 1 + strlen(key) + 1 < MAX_MSG_SIZE);
        YASSERT(strcmp(key, STORAGE_AREA_KEY));

        req = (void *)buf;
        req->op = STOR_RPC_XATTR_SET;
        req->fileid = *fileid;
        _opaque_encode(req->buf, &count, key, strlen(key) + 1, value,
                       strlen(value) + 1, &flag, sizeof(flag), NULL);

        ret = rpc_request_wait("stor_rpc_xattr_set", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_xattr_get(const nid_t *nid, const fileid_t *fileid, const char *key,
                     char *value, int *valuelen)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_XATTR_GET;
        req->fileid = *fileid;
        _opaque_encode(req->buf, &count, key, strlen(key) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_xattr_get", nid,
                               req, sizeof(*req) + count,
                               value, valuelen,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_xattr_list(const nid_t *nid, const fileid_t *fileid, char *value, int *valuelen)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count = 0;

        req = (void *)buf;
        req->op = STOR_RPC_XATTR_LIST;
        req->fileid = *fileid;

        ret = rpc_request_wait("stor_rpc_xattr_list", nid,
                               req, sizeof(*req) + count,
                               value, valuelen,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_xattr_remove(const nid_t *nid, const fileid_t *fileid, const char *key)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_XATTR_REMOVE;
        req->fileid = *fileid;
        _opaque_encode(req->buf, &count, key, strlen(key) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_xattr_remove", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_move(const nid_t *nid, const fileid_t *fileid, const nid_t *dist, int dist_count)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_MOVE;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count,
                       dist, sizeof(*dist) * dist_count,
                       NULL);

        ret = rpc_request_wait("stor_rpc_move", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_localize(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        int *idx;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &idx, NULL,
                       NULL);

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        ret = stor_ctl_localize(&req->fileid, *idx);
        if (ret) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_localize(const nid_t *nid, const fileid_t *fileid, int idx)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_LOCALIZE;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count,
                       &idx, sizeof(idx),
                       NULL);

        ret = rpc_request_wait("stor_rpc_localize", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_stat(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        off_t *off;
        size_t *size;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        filestat_t filestat;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        __getmsg(_buf, &req, &buflen, buf);
        _opaque_decode(req->buf, buflen, &off, NULL,
                                &size, NULL, NULL);

        ret = stor_ctl_stat(&req->fileid, &filestat, *off, *size);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &filestat, sizeof(filestat));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_stat(const nid_t *nid, const fileid_t *fileid, filestat_t *filestat, off_t off, size_t size)
{
        int ret, replen = sizeof(filestat_t);
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_STAT;
        req->fileid = *fileid;
        count = 0;

        _opaque_encode(req->buf, &count, &off, sizeof(off_t), &size, sizeof(size_t), NULL);

        ret = rpc_request_wait("stor_rpc_stat", nid,
                               req, sizeof(*req) + count,
                               filestat, &replen,
                               MSG_FS, 0, _get_timeout() * 4);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_check_ready(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        ret = stor_ctl_check_ready(&req->fileid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_check_ready(const nid_t *nid, const fileid_t *fileid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;

        req = (void *)buf;
        req->op = STOR_RPC_CHECK_READY;
        req->fileid = *fileid;

        ret = rpc_request_wait("stor_rpc_check_ready", nid,
                               req, sizeof(*req),
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout() * 4);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_connection(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        void *list = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);;
        int count;
        const nid_t *peer;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);
        _opaque_decode(req->buf, buflen, &peer, NULL);

        ret = stor_ctl_connection(&req->fileid, peer, list, &count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, list, count);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        mem_cache_free(MEM_CACHE_4K, list);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        mem_cache_free(MEM_CACHE_4K, list);
        return ret;
}

int stor_rpc_connection(const nid_t *nid, const fileid_t *fileid, void *list, int *_count)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CONNECTION;
        req->fileid = *fileid;
        count = 0;

        _opaque_encode(req->buf, &count, nid, sizeof(*nid), NULL);

        ret = rpc_request_wait("stor_rpc_connection", nid,
                               req, sizeof(*req) + count,
                               list, _count,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_connect(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *peer;
        const char *addr;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);
        _opaque_decode(req->buf, buflen, &peer, NULL,
                                &addr, NULL, NULL);

        ret = stor_ctl_connect(&req->fileid, peer, addr);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_connect(const nid_t *nid, const fileid_t *fileid,
                        const nid_t *peer, const char *addr)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CONNECT;
        req->fileid = *fileid;
        count = 0;

        _opaque_encode(req->buf, &count, peer, sizeof(*peer),
                        addr, strlen(addr)+1, NULL);

        ret = rpc_request_wait("stor_rpc_connect", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_disconnect(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *peer;
        const char *addr;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);
        _opaque_decode(req->buf, buflen, &peer, NULL, &addr, NULL, NULL);

        ret = stor_ctl_disconnect(&req->fileid, peer, addr);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_disconnect(const nid_t *nid, const fileid_t *fileid, const nid_t *peer, const char *addr)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_DISCONNECT;
        req->fileid = *fileid;
        count = 0;

        _opaque_encode(req->buf, &count, peer, sizeof(*peer),
                        addr, strlen(addr)+1, NULL);

        ret = rpc_request_wait("stor_rpc_disconnect", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_check(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_snapshot_check(&req->fileid, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_check(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_CHECK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_check", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_isempty(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        int empty = 0;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        ret = stor_ctl_snapshot_isempty(&req->fileid, &empty);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &empty, sizeof(int));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_isempty(const nid_t *nid, const fileid_t *fileid, int *empty)
{
        int ret, replen = sizeof(int);
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_ISEMPTY;
        req->fileid = *fileid;

        ret = rpc_request_wait("stor_rpc_snapshot_isempty", nid,
                               req, sizeof(*req),
                               empty, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_group_snapshot_lock(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uuid_t uuid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, NULL);

        ret = stor_ctl_group_snapshot_lock(&req->fileid, &uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &uuid, sizeof(uuid_t));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_group_snapshot_lock(const nid_t *nid, const fileid_t *fileid, uuid_t *_uuid)
{
        int ret, reqlen = sizeof(uuid_t);
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_GROUP_SNAPSHOT_LOCK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, NULL);

        ret = rpc_request_wait("stor_rpc_group_snapshot_lock", nid,
                               req, sizeof(*req) + count,
                               _uuid, &reqlen,
                               MSG_FS, 0, 60);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_group_snapshot_unlock(const sockid_t *sockid,
                                            const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uuid_t *uuid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &uuid, NULL, NULL);

        YASSERT(!uuid_is_null(*uuid));
        ret = stor_ctl_group_snapshot_unlock(&req->fileid, *uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_group_snapshot_unlock(const nid_t *nid, const fileid_t *fileid, const uuid_t uuid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_GROUP_SNAPSHOT_UNLOCK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, uuid, sizeof(uuid_t), NULL);
        YASSERT(!uuid_is_null(uuid));

        ret = rpc_request_wait("stor_rpc_group_snapshot_unlock", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_group_snapshot_create_nolock(const sockid_t *sockid,
                                                   const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name, *_site;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uuid_t *uuid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &_site, NULL, &uuid, NULL, NULL);

        YASSERT(!uuid_is_null(*uuid));
        ret = stor_ctl_group_snapshot_create_nolock(&req->fileid, name, _site, *uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_group_snapshot_create_nolock(const nid_t *nid, const fileid_t *fileid,
                const char *name, const char *_site, const uuid_t uuid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_GROUP_SNAPSHOT_CREATE_NOLOCK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1,
                       _site, strlen(_site) + 1, uuid, sizeof(uuid_t), NULL);

        ret = rpc_request_wait("stor_rpc_group_snapshot_create_nolock", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, 60);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_group_snapshot_remove_nolock(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uuid_t *uuid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &uuid, NULL, NULL);

        YASSERT(!uuid_is_null(*uuid));
        ret = stor_ctl_group_snapshot_remove_nolock(&req->fileid, name, *uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_group_snapshot_remove_nolock(const nid_t *nid, const fileid_t *fileid, const char *name, const uuid_t uuid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_GROUP_SNAPSHOT_REMOVE_NOLOCK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, uuid, sizeof(uuid_t), NULL);

        ret = rpc_request_wait("stor_rpc_group_snapshot_remove_nolock", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_create(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        int *p;
        msg_t *req;
        const char *name, *_site;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &p, NULL, &_site, NULL, NULL);

        ret = stor_ctl_snapshot_create(&req->fileid, name, *p, _site);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_create(const nid_t *nid, const fileid_t *fileid, const char *name, int p, const char *_site)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_CREATE;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, &p, sizeof(p), _site, strlen(_site)+1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_create", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_listopen(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *uuid;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &uuid, NULL, NULL);

        ret = stor_ctl_snapshot_listopen(&req->fileid, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_listopen(const nid_t *nid, const chkid_t *parent, const char *uuid)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_LISTOPEN;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, uuid, strlen(uuid) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_listopen", nid,
                        req, sizeof(*req) + count,
                        NULL, NULL,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_listclose(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *uuid;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &uuid, NULL, NULL);

        ret = stor_ctl_snapshot_listclose(&req->fileid, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_listclose(const nid_t *nid, const chkid_t *parent, const char *uuid)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_LISTCLOSE;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, uuid, strlen(uuid) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_listclose", nid,
                        req, sizeof(*req) + count,
                        NULL, NULL,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_list(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, delen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *de;
        const char *uuid;
        uint64_t *offset;

        ANALYSIS_BEGIN(0);

        ret = ymalloc((void **)&de, BIG_BUF_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &offset, NULL, &uuid, NULL, NULL);

        delen = BIG_BUF_LEN;
        ret = stor_ctl_snapshot_list(&req->fileid, uuid, *offset, de, &delen);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_free, ret);
        }

        rpc_reply(sockid, msgid, de, delen);

        ANALYSIS_END(0, IO_WARN, NULL);

        yfree((void **)&de);
        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_free:
        yfree((void **)&de);
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_list(const nid_t *nid, const fileid_t *parent, const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_LIST;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, &offset, sizeof(offset), uuid, strlen(uuid) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_list", nid,
                        req, sizeof(*req) + count,
                        de, delen,
                        MSG_FS, 0, _get_timeout());
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_rollback(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_snapshot_rollback(&req->fileid, name);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_rollback(const nid_t *nid, const fileid_t *fileid, const char *name)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_ROLLBACK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_rollback", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_remove(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        const int *force;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &force, NULL, NULL);

        ret = stor_ctl_snapshot_remove(&req->fileid, name, *force);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_remove(const nid_t *nid, const fileid_t *fileid, const char *name, int force)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_REMOVE;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, &force, sizeof(force), NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_remove", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_protect(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        snap_protect_param_t *on;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &on, NULL, NULL);

        ret = stor_ctl_snapshot_protect(&req->fileid, *on);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_protect(const nid_t *nid, const fileid_t *fileid, const snap_protect_param_t on)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_PROTECT;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, &on, sizeof(on), NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_protect", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_updateparent(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        const uint64_t *from;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &from, NULL, NULL);

        ret = stor_ctl_snapshot_updateparent(&req->fileid, name, *from);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);
        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_updateparent(const nid_t *nid, const fileid_t *fileid, const char *name, const uint64_t from)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_UPDATEPARENT;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, name, strlen(name) + 1, &from, sizeof(from), NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_updateparent", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_setfrom(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        uint64_t *from;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &from, NULL, NULL);

        ret = stor_ctl_snapshot_setfrom(&req->fileid, *from);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_setfrom(const nid_t *nid, const fileid_t *fileid, const uint64_t from)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_SETFROM;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, &from, sizeof(from), NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_setfrom", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_last(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *rep = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t replen;
        nid_t snapnid;
        fileid_t snapid;
        char snapname[MAX_NAME_LEN];
        uint64_t snap_version;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        ret = stor_ctl_snapshot_last(&req->fileid, &snapnid, &snapid, snapname, &snap_version);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        _opaque_encode(rep, &replen, &snapnid, sizeof(snapnid), &snapid, sizeof(snapid),
                        snapname, strlen(snapname) + 1, &snap_version, sizeof(snap_version), NULL);

        rpc_reply(sockid, msgid, rep, replen);

        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);
        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_last(const nid_t *nid, const fileid_t *fileid, nid_t *snapnid, fileid_t *snapid,
                char *snapname, uint64_t *snap_version)
{
        int ret, replen = PAGE_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *rep = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        msg_t *req;
        uint32_t count;
        nid_t *repnid;
        fileid_t *repid;
        char *repname;
        uint64_t *rep_version;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_LAST;
        req->fileid = *fileid;
        count = 0;

        ret = rpc_request_wait("stor_rpc_snapshot_last", nid,
                               req, sizeof(*req) + count,
                               rep, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(rep, replen, &repnid, NULL, &repid, NULL, &repname, NULL, &rep_version, NULL, NULL);
        *snapnid = *repnid;
        *snapid = *repid;
        strcpy(snapname, repname);
        *snap_version = *rep_version;

        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_prev(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *rep = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t replen;
        const fileid_t *snapid;
        fileid_t previd;
        char snapname[MAX_NAME_LEN];
        uint64_t snap_version;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &snapid, NULL, NULL);

        ret = stor_ctl_snapshot_prev(&req->fileid, snapid, &previd, snapname, &snap_version);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        _opaque_encode(rep, &replen, &previd, sizeof(previd),
                        snapname, strlen(snapname) + 1, &snap_version, sizeof(snap_version), NULL);

        rpc_reply(sockid, msgid, rep, replen);

        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);
        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_prev(const nid_t *nid, const fileid_t *fileid, const fileid_t *snapid,
                fileid_t *previd, char *name, uint64_t *snap_version)
{
        int ret, replen = PAGE_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *rep = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        fileid_t *repid;
        char *repname;
        uint64_t *rep_version;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_PREV;
        req->fileid = *fileid;
        _opaque_encode(req->buf, &count, snapid, sizeof(*snapid), NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_prev", nid,
                               req, sizeof(*req) + count,
                               rep, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(rep, replen, &repid, NULL, &repname, NULL, &rep_version, NULL, NULL);
        *previd = *repid;
        strcpy(name, repname);
        *snap_version = *rep_version;

        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, rep);
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

#if 0
STATIC int __stor_ctl_snapshot_cleanup(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        (void) buflen;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        //_opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = stor_ctl_snapshot_cleanup(&req->fileid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_cleanup(const nid_t *nid, const fileid_t *fileid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_SNAPSHOT_CLEANUP;
        req->fileid = *fileid;

        count = 0;
        //_opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("stor_rpc_snapshot_cleanup", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}
#endif

static void __request_handler(void *arg)
{
        int ret;
        msg_t req;
        sockid_t sockid;
        msgid_t msgid;
        buffer_t buf;
        __request_handler_func__ handler;
        const char *name;

        request_trans(arg, &sockid, &msgid, &buf, NULL);

        if (buf.len < sizeof(req)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        mbuffer_get(&buf, &req, sizeof(req));

        DBUG("new job op %u\n", req.op);

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        //handler = __request_handler__[req.op - STOR_RPC_MAX];
        __request_get_handler(req.op, &handler, &name);
        if (handler == NULL) {
                ret = ENOSYS;
                DWARN("error op %u\n", req.op);
                GOTO(err_ret, ret);
        }

        schedule_task_setname(name);

        ret = handler(&sockid, &msgid, &buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mbuffer_free(&buf);

        return ;
err_ret:
        mbuffer_free(&buf);
        if (sockid.type == SOCKID_CORENET) {
                corerpc_reply_error(&sockid, &msgid, ret);
        } else {
                rpc_reply_error(&sockid, &msgid, ret);
        }
        return;
}

STATIC int __stor_ctl_mkvolwith(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkinfo_t *chkinfo;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &name, NULL, &chkinfo, NULL, NULL);

        ret = stor_ctl_mkvolwith(&req->fileid, name, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_mkvolwith(const nid_t *nid, const chkid_t *parent, const char *name,
                  const chkinfo_t *chkinfo)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = STOR_RPC_MKVOLWITH;
        req->fileid = *parent;
        _opaque_encode(req->buf, &count, name, strlen(name) + 1, chkinfo,
                       CHKINFO_SIZE(chkinfo->repnum), NULL);

        ret = rpc_request_wait("stor_rpc_mkvolwith", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_read(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        buffer_t reply;
        io_t *io;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &io, NULL, NULL);

        mbuffer_init(&reply, 0);
        ret = volume_ctl_snapshot_read(&req->fileid, io, &reply);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        corerpc_reply1(sockid, msgid, &reply);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

STATIC int __stor_ctl_snapshot_read_meta(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        buffer_t reply;
        io_t *io;
        const char *snap;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &io, NULL, &snap, NULL, NULL);

        mbuffer_init(&reply, 0);
        ret = volume_ctl_snapshot_read_meta(&req->fileid, io, snap, &reply);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        corerpc_reply1(sockid, msgid, &reply);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_lower_read(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        buffer_t reply;
        io_t *io;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &io, NULL, NULL);

        mbuffer_init(&reply, 0);
        ret = volume_ctl_lower_read(&req->fileid, io, &reply);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        corerpc_reply1(sockid, msgid, &reply);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_snapshot_diff(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t *size;
        buffer_t reply;
        uint64_t *offset;
        const fileid_t *fileid, *snapdst;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &fileid, NULL, &snapdst, NULL, &size, NULL, &offset, NULL, NULL);

        mbuffer_init(&reply, 0);
        ret = volume_ctl_snapshot_diff(&req->fileid, fileid, snapdst, &reply, *size, *offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        corerpc_reply1(sockid, msgid, &reply);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_read(const nid_t *nid, const fileid_t *parent, const io_t *io, buffer_t *_buf)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        DBUG("snapshot_read chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, network_rname(nid));

        req = (void *)buf;
        _opaque_encode(req->buf, &count, io, sizeof(*io), NULL);
        req->op = STOR_RPC_SNAPSHOT_READ;
        req->fileid = *parent;
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_snap_read", nid,
                               req, sizeof(*req) + count, NULL,
                               _buf, MSG_FS, io->size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_read_meta(const nid_t *nid, const fileid_t *parent, const io_t *io, const char *snap, buffer_t *_buf)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        DBUG("snapshot_read chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, network_rname(nid));

        req = (void *)buf;
        _opaque_encode(req->buf, &count, io, sizeof(*io), snap, strlen(snap)+1, NULL);
        req->op = STOR_RPC_SNAPSHOT_READ_META;
        req->fileid = *parent;
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_snap_read_meta", nid,
                               req, sizeof(*req) + count, NULL,
                               _buf, MSG_FS, io->size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_lower_read(const nid_t *nid, const fileid_t *parent, const io_t *io, buffer_t *_buf)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        DBUG("snapshot_read chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, network_rname(nid));

        req = (void *)buf;
        _opaque_encode(req->buf, &count, io, sizeof(*io), NULL);
        req->op = STOR_RPC_LOWER_READ;
        req->fileid = *parent;
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_lower_read", nid,
                               req, sizeof(*req) + count, NULL,
                               _buf, MSG_FS, io->size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_flat(const nid_t *nid, const fileid_t *fileid, int idx, int force)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        _opaque_encode(req->buf, &count, &idx, sizeof(idx), &force, sizeof(force), NULL);
        req->op = STOR_RPC_SNAPSHOT_FLAT;
        req->fileid = *fileid;

        ret = rpc_request_wait("stor_rpc_snap_flat", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_snapshot_diff(const nid_t *nid, const fileid_t *parent, const fileid_t *fileid,
                const fileid_t *snapdst, buffer_t *_buf, uint32_t size, uint64_t offset)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        DBUG("snapshot_read chunk "CHKID_FORMAT", offset %llu, size %u at %s\n",
             CHKID_ARG(fileid), (LLU)offset, size, network_rname(nid));

        req = (void *)buf;
        _opaque_encode(req->buf, &count, fileid, sizeof(*fileid), snapdst, sizeof(*snapdst),
                        &size, sizeof(size), &offset, sizeof(offset), NULL);
        req->op = STOR_RPC_SNAPSHOT_DIFF;
        req->fileid = *parent;
        req->arg_len = count;

        ret = corerpc_postwait("stor_rpc_snap_diff", nid,
                               req, sizeof(*req) + count, NULL,
                               _buf, MSG_FS, size, _get_timeout());
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                UNIMPLEMENTED(__WARN__);
                //corenet_close();
                GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_newchunk(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t buflen;
        const chkid_t *chkid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->arg_len;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen, &chkid, NULL, NULL);

        ret = volume_ctl_newchunk(&req->fileid, chkid, _buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_newchunk(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid,
                        const buffer_t *_buf)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        YASSERT(_buf->len <= LICH_CHUNK_SPLIT);
        YASSERT(fileid->type == __VOLUME_CHUNK__ || fileid->type == __POOL_CHUNK__);

        DBUG("newchunk chunk "CHKID_FORMAT" @ %s\n",
             CHKID_ARG(chkid), network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_NEWCHUNK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), NULL);
        req->arg_len = count;

        ret = rpc_request_wait1("stor_rpc_newchunk", nid,
                                req, sizeof(*req) + count, _buf,
                                MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_exist(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, exist;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t buflen;
        const chkid_t *chkid;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->arg_len;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen, &chkid, NULL, NULL);

        ret = volume_ctl_chunk_exist(&req->fileid, chkid, &exist);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &exist, sizeof(exist));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_exist(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid, int *exist)
{
        int ret, buflen;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        YASSERT(fileid->type == __VOLUME_CHUNK__ || fileid->type == __POOL_CHUNK__);

        DBUG("check chunk "CHKID_FORMAT" @ %s\n",
             CHKID_ARG(chkid), network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_EXIST;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), NULL);
        req->arg_len = count;

        buflen = sizeof(*exist);
        ret = rpc_request_wait("stor_rpc_chunk_exist", nid,
                               req, sizeof(*req) + count, exist, &buflen,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_discard(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, NULL);

        ret = volume_ctl_discard(&req->fileid, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_discard(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        YASSERT(fileid->type == __VOLUME_CHUNK__ || fileid->type == __POOL_CHUNK__);

        DBUG("discard chunk "CHKID_FORMAT" @ %s\n",
             CHKID_ARG(chkid), network_rname(nid));

        req = (void *)buf;
        req->op = STOR_RPC_DISCARD;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), NULL);
        req->arg_len = count;

        ret = rpc_request_wait("stor_rpc_discard", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout() / 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_chunk_reject(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char tmp[CHKSTAT_MAX];
        chkinfo_t *chkinfo;
        const chkid_t *chkid;
        const nid_t *bad;

        ANALYSIS_BEGIN(0);

        req = (void *)buf;
        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen,
                       &chkid, NULL,
                       &bad, NULL,
                       NULL);

        chkinfo = (void *)tmp;
        ret = stor_ctl_chunk_reject(&req->fileid, chkid, bad, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, chkinfo,
                  CHKINFO_SIZE(chkinfo->repnum));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_chunk_reject(const nid_t *nid, const fileid_t *parent, const chkid_t *chkid,
                          const nid_t *bad, chkinfo_t *chkinfo)
{
        int ret, replen = MAX_MSG_SIZE;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_CHUNK_REJECT;
        req->fileid = *parent;

        _opaque_encode(req->buf, &count,
                       chkid, sizeof(*chkid),
                       bad, sizeof(*bad),
                       NULL);

        ret = rpc_request_wait("stor_rpc_chunk_reject", nid,
                               req, sizeof(*req) + count,
                               chkinfo, &replen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(replen == (int)CHKINFO_SIZE(chkinfo->repnum));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_vfm_cleanup(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *tid;

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &tid, NULL, NULL);

        ret = stor_ctl_vfm_cleanup(&req->fileid, tid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_vfm_cleanup(const nid_t *nid, const fileid_t *fileid, const chkid_t *tid)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_INTACT_CHECK;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, tid, sizeof(*tid), NULL);

        ret = rpc_request_wait("stor_rpc_vfm_cleanup", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_vfm_set(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        const vfm_t *vfm;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, &vfm,
                       NULL, NULL);

        char tmp[MAX_BUF_LEN];
        vfm_dump(vfm, tmp);
        DWARN("set "CHKID_FORMAT" vfm %s\n", CHKID_ARG(chkid), tmp);
        
        ret = stor_ctl_vfm_set(&req->fileid, chkid, vfm);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_vfm_set(const nid_t *nid, const fileid_t *fileid,
                     const chkid_t *chkid, const vfm_t *vfm)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;

        req = (void *)buf;
        req->op = STOR_RPC_VFM_SET;
        req->fileid = *fileid;

        char tmp[MAX_BUF_LEN];
        vfm_dump(vfm, tmp);
        DBUG("set "CHKID_FORMAT" vfm %s\n", CHKID_ARG(chkid), tmp);
        
        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid),
                       vfm, VFM_SIZE(vfm->count), NULL);

        ret = rpc_request_wait("stor_rpc_vfm_set", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_vfm_get(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *chkid;
        vfm_t *vfm;
        char tmp[MAX_BUF_LEN];

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &chkid, NULL, NULL);

        DBUG("vfm get "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        
        vfm = (void *)tmp;
        ret = stor_ctl_vfm_get(&req->fileid, chkid, vfm);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, vfm, VFM_SIZE(vfm->count));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_vfm_get(const nid_t *nid, const fileid_t *fileid,
                     const chkid_t *chkid, vfm_t *vfm)
{
        int ret, buflen = MAX_BUF_LEN;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;
        uint32_t count;
        
        req = (void *)buf;
        req->op = STOR_RPC_VFM_GET;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &count, chkid, sizeof(*chkid), chkid,
                       sizeof(*chkid), NULL);

        ret = rpc_request_wait("stor_rpc_vfm_get", nid,
                               req, sizeof(*req) + count,
                               vfm, &buflen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __stor_ctl_vfm_stat(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, count = 0;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        DBUG("vol "CHKID_FORMAT"\n", CHKID_ARG(&req->fileid));

        ret = stor_ctl_vfm_stat(&req->fileid, &count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DBUG("vol "CHKID_FORMAT" vfm %d\n", CHKID_ARG(&req->fileid), count);

        rpc_reply(sockid, msgid, &count, sizeof(int));

        ANALYSIS_END(0, IO_WARN, NULL);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_vfm_stat(const nid_t *nid, const fileid_t *fileid, int *count)
{
        int ret, buflen = MAX_BUF_LEN;
        uint32_t rbuflen = 0;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;

        req = (void *)buf;
        req->op = STOR_RPC_VFM_STAT;
        req->fileid = *fileid;

        _opaque_encode(req->buf, &rbuflen, NULL);

        DBUG("vol "CHKID_FORMAT"\n", CHKID_ARG(&req->fileid));

        ret = rpc_request_wait("stor_rpc_vfm_stat", nid,
                               req, sizeof(*req) + rbuflen,
                               count, &buflen,
                               MSG_FS, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        DBUG("vol "CHKID_FORMAT" vfm %d\n", CHKID_ARG(&req->fileid), count);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int stor_rpc_init()
{
        __request_set_handler(STOR_RPC_MKVOL, __stor_ctl_mkvol, "stor_ctl_mkvol");
        __request_set_handler(STOR_RPC_MKVOLWITH, __stor_ctl_mkvolwith, "stor_ctl_mkvolwith");
        __request_set_handler(STOR_RPC_CHUNK_MOVE, __stor_ctl_chunk_move, "stor_ctl_chunk_move");
        __request_set_handler(STOR_RPC_CHUNK_CHECK, __stor_ctl_chunk_check, "stor_ctl_chunk_check");
        __request_set_handler(STOR_RPC_CHUNK_CHECK_MULTI, __stor_ctl_chunk_check_multi, "stor_ctl_chunk_check");
        __request_set_handler(STOR_RPC_CHUNK_SET, __stor_ctl_chunk_set, "stor_ctl_chunk_set");

        __request_set_handler(STOR_RPC_VFM_SET, __stor_ctl_vfm_set, "stor_ctl_vfm_set");
        __request_set_handler(STOR_RPC_VFM_GET, __stor_ctl_vfm_get, "stor_ctl_vfm_get");
        __request_set_handler(STOR_RPC_VFM_STAT, __stor_ctl_vfm_stat, "stor_ctl_vfm_stat");

        __request_set_handler(STOR_RPC_XATTR_SET, __stor_ctl_xattr_set, "stor_ctl_xattr_set");
        __request_set_handler(STOR_RPC_XATTR_GET, __stor_ctl_xattr_get, "stor_ctl_xattr_get");
        __request_set_handler(STOR_RPC_XATTR_LIST, __stor_ctl_xattr_list, "stor_ctl_xattr_list");
        __request_set_handler(STOR_RPC_XATTR_REMOVE, __stor_ctl_xattr_remove, "stor_ctl_xattr_remove");
        __request_set_handler(STOR_RPC_READ, __stor_ctl_read, "stor_ctl_read");
        __request_set_handler(STOR_RPC_WRITE, __stor_ctl_write, "stor_ctl_write");
        __request_set_handler(STOR_RPC_UNMAP, __stor_ctl_unmap, "stor_ctl_unmap");
        __request_set_handler(STOR_RPC_TABLE_READ, __stor_ctl_table_read, "stor_ctl_table_read");
        __request_set_handler(STOR_RPC_TABLE_WRITE, __stor_ctl_table_write, "stor_ctl_table_write");
        __request_set_handler(STOR_RPC_NEWCHUNK, __stor_ctl_newchunk, "stor_ctl_newchunk");
        __request_set_handler(STOR_RPC_DISCARD, __stor_ctl_discard, "stor_ctl_discard");
        __request_set_handler(STOR_RPC_MKPOOL, __stor_ctl_mkpool, "stor_ctl_mkpool");
        __request_set_handler(STOR_RPC_LISTPOOL_OPEN, __stor_ctl_listpool_open, "stor_ctl_listpool_open");
        __request_set_handler(STOR_RPC_LISTPOOL, __stor_ctl_listpool, "stor_ctl_listpool");
        __request_set_handler(STOR_RPC_LISTPOOL_CLOSE, __stor_ctl_listpool_close, "stor_ctl_listpool_close");
        __request_set_handler(STOR_RPC_EXTENDPOOL, __stor_ctl_extend_pool, "stor_ctl_extend_pool");
        __request_set_handler(STOR_RPC_LOOKUP, __stor_ctl_lookup, "stor_ctl_lookup");
        __request_set_handler(STOR_RPC_CHUNK_GETINFO, __stor_ctl_chunk_getinfo, "stor_ctl_chunk_getinfo");
        __request_set_handler(STOR_RPC_GETPOOL, __stor_ctl_getpool, "stor_ctl_getpool");
        __request_set_handler(STOR_RPC_GETATTR, __stor_ctl_getattr, "stor_ctl_getattr");
        __request_set_handler(STOR_RPC_SETATTR, __stor_ctl_setattr, "stor_ctl_setattr");
        __request_set_handler(STOR_RPC_CHUNK_ALLOCATE, __stor_ctl_chunk_allocate, "stor_ctl_chunk_allocate");
        __request_set_handler(STOR_RPC_CHUNK_MIGRATE, __stor_ctl_chunk_migrate, "stor_ctl_chunk_migrate");
        __request_set_handler(STOR_RPC_CHUNK_REJECT, __stor_ctl_chunk_reject, "stor_ctl_chunk_reject");
        __request_set_handler(STOR_RPC_CHUNK_UPDATE, __stor_ctl_chunk_update, "stor_ctl_chunk_update");
        __request_set_handler(STOR_RPC_CHUNK_CLEANUP, __stor_ctl_chunk_cleanup, "stor_ctl_chunk_cleanup");
        __request_set_handler(STOR_RPC_CHUNK_EXIST, __stor_ctl_chunk_exist, "stor_ctl_chunk_exist");
        __request_set_handler(STOR_RPC_RMPOOL, __stor_ctl_rmpool, "stor_ctl_rmpool");
        __request_set_handler(STOR_RPC_RMVOL, __stor_ctl_rmvol, "stor_ctl_rmvol");
        __request_set_handler(STOR_RPC_CLEANUP, __stor_ctl_cleanup, "stor_ctl_cleanup");
        __request_set_handler(STOR_RPC_INTACT_CHECK, __stor_ctl_vfm_cleanup, "stor_ctl_vfm_cleanup");
        __request_set_handler(STOR_RPC_CLEANUP_BH, __stor_ctl_cleanup_bh, "stor_ctl_cleanup_bh");
        __request_set_handler(STOR_RPC_ROLLBACK_BH, __stor_ctl_rollback_bh, "stor_ctl_rollback_bh");
        __request_set_handler(STOR_RPC_RMSNAP_BH, __stor_ctl_rmsnap_bh, "stor_ctl_rmsnap_bh");
        __request_set_handler(STOR_RPC_FLAT_BH, __stor_ctl_flat_bh, "stor_ctl_flat_bh");
        __request_set_handler(STOR_RPC_RENAME_LOCK, __stor_ctl_rename_lock, "stor_ctl_rename_lock");
        __request_set_handler(STOR_RPC_RENAME, __stor_ctl_rename, "stor_ctl_rename");
        __request_set_handler(STOR_RPC_LOCALIZE, __stor_ctl_localize, "stor_ctl_localize");
        __request_set_handler(STOR_RPC_STAT, __stor_ctl_stat, "stor_ctl_stat");
        __request_set_handler(STOR_RPC_CHECK_READY, __stor_ctl_check_ready, "stor_ctl_check_ready");
        __request_set_handler(STOR_RPC_CONNECTION, __stor_ctl_connection, "stor_ctl_connection");
        __request_set_handler(STOR_RPC_CONNECT, __stor_ctl_connect, "stor_ctl_connect");
        __request_set_handler(STOR_RPC_DISCONNECT, __stor_ctl_disconnect, "stor_ctl_disconnect");
        __request_set_handler(STOR_RPC_RENAME_UNLOCK, __stor_ctl_rename_unlock, "stor_ctl_rename_unlock");
        __request_set_handler(STOR_RPC_MOVE, __stor_ctl_move, "stor_ctl_move");

        __request_set_handler(STOR_RPC_SNAPSHOT_CREATE, __stor_ctl_snapshot_create, "stor_ctl_snapshot_create");
        __request_set_handler(STOR_RPC_SNAPSHOT_LISTOPEN, __stor_ctl_snapshot_listopen, "stor_ctl_snapshot_listopen");
        __request_set_handler(STOR_RPC_SNAPSHOT_LISTCLOSE, __stor_ctl_snapshot_listclose, "stor_ctl_snapshot_listclose");
        __request_set_handler(STOR_RPC_SNAPSHOT_LIST, __stor_ctl_snapshot_list, "stor_ctl_snapshot_list");
        __request_set_handler(STOR_RPC_SNAPSHOT_READ, __stor_ctl_snapshot_read, "stor_ctl_snapshot_read");
        __request_set_handler(STOR_RPC_SNAPSHOT_READ_META, __stor_ctl_snapshot_read_meta, "stor_ctl_snapshot_read_meta");
        __request_set_handler(STOR_RPC_LOWER_READ, __stor_ctl_lower_read, "stor_ctl_lower_read");
        __request_set_handler(STOR_RPC_SNAPSHOT_FLAT, __stor_ctl_snapshot_flat, "stor_ctl_snapshot_flat");
        __request_set_handler(STOR_RPC_SNAPSHOT_DIFF, __stor_ctl_snapshot_diff, "stor_ctl_snapshot_diff");
        __request_set_handler(STOR_RPC_SNAPSHOT_ROLLBACK, __stor_ctl_snapshot_rollback, "stor_ctl_snapshot_rollback");
        __request_set_handler(STOR_RPC_SNAPSHOT_REMOVE, __stor_ctl_snapshot_remove, "stor_ctl_snapshot_remove");
        __request_set_handler(STOR_RPC_SNAPSHOT_PROTECT, __stor_ctl_snapshot_protect, "stor_ctl_snapshot_protect");
        __request_set_handler(STOR_RPC_SNAPSHOT_UPDATEPARENT, __stor_ctl_snapshot_updateparent, "stor_ctl_snapshot_updateparent");
        __request_set_handler(STOR_RPC_SNAPSHOT_SETFROM, __stor_ctl_snapshot_setfrom, "stor_ctl_snapshot_setfrom");
        __request_set_handler(STOR_RPC_SNAPSHOT_LAST, __stor_ctl_snapshot_last, "stor_ctl_snapshot_last");
        __request_set_handler(STOR_RPC_SNAPSHOT_PREV, __stor_ctl_snapshot_prev, "stor_ctl_snapshot_prev");
        // __request_set_handler(STOR_RPC_SNAPSHOT_CLEANUP, __stor_ctl_snapshot_cleanup, "stor_ctl_snapshot_cleanup");
        __request_set_handler(STOR_RPC_SNAPSHOT_CHECK, __stor_ctl_snapshot_check, "stor_ctl_snapshot_check");
        __request_set_handler(STOR_RPC_SNAPSHOT_ISEMPTY, __stor_ctl_snapshot_isempty, "stor_ctl_snapshot_isempty");

        __request_set_handler(STOR_RPC_GROUP_SNAPSHOT_LOCK,
                        __stor_ctl_group_snapshot_lock, "stor_ctl_group_snapshot_lock");
        __request_set_handler(STOR_RPC_GROUP_SNAPSHOT_UNLOCK,
                        __stor_ctl_group_snapshot_unlock, "stor_ctl_group_snapshot_unlock");
        __request_set_handler(STOR_RPC_GROUP_SNAPSHOT_CREATE_NOLOCK,
                        __stor_ctl_group_snapshot_create_nolock, "stor_ctl_group_snapshot_create_nolock");
        __request_set_handler(STOR_RPC_GROUP_SNAPSHOT_REMOVE_NOLOCK,
                        __stor_ctl_group_snapshot_remove_nolock, "stor_ctl_group_snapshot_remove_nolock");


        rpc_request_register(MSG_FS, __request_handler, NULL);
        corerpc_register(MSG_FS, __request_handler, NULL);

        return 0;
}
